package com.hbase.dao.resolve;

import java.lang.reflect.Field;
import java.util.Date;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RowLock;
import org.apache.hadoop.hbase.util.Bytes;

import com.hbase.dao.annotation.HBaseColumn;
import com.hbase.dao.exception.HBaseDaoException;
import com.hbase.dao.util.CheckUtil;
import com.hbase.dao.util.FieldUtil;

/**
 * HBaseEntityResolve
 * 
 * @author yangshenhui
 * @version 1.0
 */
@SuppressWarnings("deprecation")
public class HBaseEntityResolve {

	public static <T> T resolve(Result result, Class<T> clazz,
			HBaseTableEntityAnnotation hBaseTableEntityAnnotation)
			throws HBaseDaoException {
		CheckUtil.checkNull(result);
		CheckUtil.checkNull(clazz);
		CheckUtil.checkNull(hBaseTableEntityAnnotation);
		T t = null;
		try {
			t = clazz.newInstance();
			hBaseTableEntityAnnotation.setRowKey(t, result.getRow());
			for (Field field : hBaseTableEntityAnnotation
					.gethBaseColumnFieldList()) {
				HBaseColumn hBaseColumn = field
						.getAnnotation(HBaseColumn.class);
				String family = getFamily(hBaseTableEntityAnnotation,
						hBaseColumn.family(), field);
				String column = hBaseColumn.column();
				column = StringUtils.isEmpty(column) ? field.getName() : column;
				byte[] value = result.getValue(Bytes.toBytes(family),
						Bytes.toBytes(column));
				FieldUtil.setFieldValue(t, field, value);
			}
		} catch (InstantiationException | IllegalAccessException e) {
			throw new HBaseDaoException(e);
		}
		return t;
	}

	public static <T> Put resolve(T t, RowLock rowLock,
			HBaseTableEntityAnnotation hBaseTableEntityAnnotation)
			throws HBaseDaoException {
		CheckUtil.checkNull(t);
		CheckUtil.checkNull(hBaseTableEntityAnnotation);
		Put put = null;
		if (CheckUtil.isNotNull(rowLock)) {
			put = new Put(hBaseTableEntityAnnotation.getRowKey(t), rowLock);
		} else {
			put = new Put(hBaseTableEntityAnnotation.getRowKey(t));
		}
		createPutValues(t, hBaseTableEntityAnnotation, put);
		return put;
	}

	private static <T> void createPutValues(T t,
			HBaseTableEntityAnnotation hBaseTableEntityAnnotation, Put put)
			throws HBaseDaoException {
		CheckUtil.checkNull(t);
		CheckUtil.checkNull(hBaseTableEntityAnnotation);
		CheckUtil.checkNull(put);
		int keyValueCount = 0;
		for (Field field : hBaseTableEntityAnnotation.gethBaseColumnFieldList()) {
			keyValueCount += processHBaseColumnFields(t,
					hBaseTableEntityAnnotation, put, field);
		}
		if (keyValueCount == 0)
			throw new HBaseDaoException(
					"there is no values to do in this operation.");
	}

	private static <T> int processHBaseColumnFields(T t,
			HBaseTableEntityAnnotation hBaseTableEntityAnnotation, Put put,
			Field field) throws HBaseDaoException {
		CheckUtil.checkNull(t);
		CheckUtil.checkNull(hBaseTableEntityAnnotation);
		CheckUtil.checkNull(put);
		CheckUtil.checkNull(field);
		HBaseColumn hBaseColumn = field.getAnnotation(HBaseColumn.class);
		String column = hBaseColumn.column();
		column = StringUtils.isEmpty(column) ? field.getName() : column;
		Object value = FieldUtil.getFieldValue(t, field);
		if (CheckUtil.isNull(value)) {
			return 0;
		}
		if (value instanceof Date) {
			value = ((Date) value).getTime();
		}
		String family = getFamily(hBaseTableEntityAnnotation,
				hBaseColumn.family(), field);
		put.add(Bytes.toBytes(family), Bytes.toBytes(column),
				Bytes.toBytes(value.toString()));
		return 1;
	}

	private static String getFamily(
			HBaseTableEntityAnnotation hBaseTableEntityAnnotation,
			String family, Field field) throws HBaseDaoException {
		CheckUtil.checkNull(hBaseTableEntityAnnotation);
		CheckUtil.checkNull(field);
		String familyTemporary = family;
		if (StringUtils.isEmpty(familyTemporary)) {
			String defaultFamily = hBaseTableEntityAnnotation.getHBaseTable()
					.family();
			familyTemporary = StringUtils.isNotEmpty(defaultFamily) ? defaultFamily
					: null;
		}
		if (StringUtils.isEmpty(familyTemporary)) {
			throw new HBaseDaoException(field.getName()
					+ " does not define a family.");
		}
		return familyTemporary;
	}

	private HBaseEntityResolve() {
	}

}